Server Pattern, perform transaction
concurrencyicon.icon
Performs transaction on each thread using async package
code:Transaction.hs
module Main where
import Control.Concurrent
import Control.Concurrent.STM
import Control.Concurrent.Async (async, wait)
import System.Random (randomRIO)
import Control.Monad (forever, when, join)
second :: Int
second = 1000000
transfer :: TVar Integer -> TVar Integer -> Integer -> STM (IO ())
transfer from to amount = do
current <- readTVar from
when (current < amount) retry
modifyTVar from (\x -> x - amount)
modifyTVar to (+amount)
return $ putStrLn $ "ok: " ++ show amount
transfer' :: TVar Integer -> TVar Integer -> TVar Integer -> Integer -> STM (IO ())
transfer' from from' to amount = transfer from to amount orElse transfer from' to amount
getTotal accs = sum <$> mapM readTVar accs
randomTransfer xs = do
let maxIndex = length xs - 1
from <- randomRIO (0, maxIndex)
to <- randomRIO (0, maxIndex)
amount <- randomRIO (-100, 100)
join $ atomically $ transfer (xs !! from) (xs !! to) amount -- transfer makes IO (IO ()), use join
monitor expected accs = forever $ do
actual <- atomically $ getTotal accs
when (actual /= expected) $
print $ "INVALID STATE: expected "
++ show expected
++ ", but have "
++ show actual
main :: IO ()
main = do
total <- atomically $ getTotal accs -- get total
print total
forkIO $ monitor total accs -- Provide monitor for each thread
replicateConcurrently_ 100000 (randomTransfer accs)
-- Perform random transaction, uses thread per transaction
join :: Monad m => m (m a) -> m a
The join function is the conventional monad join operator. It is used to remove one level of monadic structure, projecting its bound argument into the outer level.
retry :: STM a
Retry execution of the current memory transaction because it has seen values in TVars which mean that it should not continue (e.g. the TVars represent a shared buffer that is now empty). The implementation may block the thread until one of the TVars that it has read from has been udpated. (GHC only)
async library
We cannot easily wait for all threads to finish.
Use a TVar Bool to indicate whether a thread is finished.
Fortunately, all this is already done in the async package
If an exception in a thread is triggered but not caught, the thread will be stopped and the exception will propagate to the parent once it calls wait .
Control.Concurrent.Async link async :: IO a -> IO (Async a) --an improved forkIO
An asynchronous action spawned by async or withAsync. Asynchronous actions are executed in a separate thread, and operations are provided for waiting for asynchronous actions to complete and obtaining their results (see e.g. wait).
wait :: Async a -> IO a
Wait for an asynchronous action to complete, and return its value. If the asynchronous action threw an exception, then the exception is re-thrown by wait.
replicateConcurrently :: Int -> IO a -> IO [a]
Perform the action in the given number of threads.